package main

import (
	"context"
	"fmt"
	"gitee.com/ifinder/finder-kafka-go/kafkax/consumer"
	"github.com/Shopify/sarama"
	cluster "github.com/cnfinder/sarama-cluster"
	"os"
	"strings"
)

func main() {

	D()
}


//===========consumer group 方案


type exampleConsumerGroupHandler struct{}

func (exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
		sess.MarkMessage(msg, "")
	}
	return nil
}

// 使用 consumerGroup
func E()  {

	config:=sarama.NewConfig()
	config.Version = sarama.V2_0_0_0 // specify appropriate version
	config.Consumer.Return.Errors = true
	config.Consumer.Offsets.AutoCommit.Enable=true

	group,err:=sarama.NewConsumerGroup([]string{"mq.cps.cvod.net:9092"},"group-default",config)
	if err!=nil{
		panic(err)
	}
	defer func() { _ = group.Close() }()

	// Track errors
	go func() {
		for err := range group.Errors() {
			fmt.Println("ERROR", err)
		}
	}()

	// Iterate over consumer sessions.
	ctx := context.Background()
	for {
		topics := []string{"msg_order_search_update"}
		handler := exampleConsumerGroupHandler{}

		// `Consume` should be called inside an infinite loop, when a
		// server-side rebalance happens, the consumer session will need to be
		// recreated to get the new claims
		err := group.Consume(ctx, topics, handler)
		if err != nil {
			panic(err)
		}
	}
}

//=============




func D()  {

	config:=consumer.GetConfig()
	config.Consumer.Offsets.Initial = sarama.OffsetOldest

	cs,err:=consumer.NewFinderKafkaConsumer([]string{"mq.t.xingkonglian.net:9092"},"test-consumer-group",[]string{"msg_order_search_update"})

	if err!=nil{
		panic(err)

	}

	cs.Consume(func(err error) {
		fmt.Println(err)
	}, func(notification *cluster.Notification) {
		fmt.Printf("%+v",notification)
	}, func(message *sarama.ConsumerMessage) {
		fmt.Println(message)
	})

}


func C()  {

	config:=consumer.GetConfig()
	config.Consumer.Offsets.Initial = sarama.OffsetNewest

	cs,_:=consumer.NewFinderKafkaConsumer([]string{"mq.cps.cvod.net:9092"},"test-consumer-group",[]string{"msg_order_search_update"})

	go func() {
		for err := range cs.Consumer.Errors() {
			fmt.Println("Error: ", err.Error())
		}
	}()



	go func() {
		for note := range cs.Consumer.Notifications() {
			fmt.Println(note)
		}
	}()

	//1) at most once: 最多一次,这个和JMS中"非持久化"消息类似.发送一次,无论成败,将不会重发.
	//2) at least once: 消息至少发送一次,如果消息未能接受成功,可能会重发,直到接收成功.
	//3) exactly once: 消息只会发送一次.
	// 首选  2: 所以需要保证 业务处理成功，并且 offset 成功写入到 redis
	for msg := range cs.Consumer.Messages() { //这种方式也能接收到以前得offset消息

		fmt.Println(msg)

	}

}

func B()  {
	config := cluster.NewConfig()
	config.Consumer.Return.Errors = true
	config.Group.Return.Notifications = true
	config.Consumer.Offsets.Initial = sarama.OffsetOldest //OffsetOldest 可以消费   OffsetNewest 不可以消费
	cs,err:=cluster.NewConsumer([]string{"mq.cps.cvod.net:9092"},"group-01",[]string{"msg_order_search_update"},config)
	if err!=nil{
		panic(err)
	}

	for{


		select {
		case n := <-cs.Notifications():
			fmt.Println(n)
		case err := <-cs.Errors():
			fmt.Println(err)
		case m:=<-cs.Messages():
			fmt.Printf("msg:%+v",m)

		}

	}

}

func A()  {

	groupID := "group-1"
	topicList := "topic_aaa"
	config := cluster.NewConfig()
	config.Consumer.Return.Errors = true
	config.Group.Return.Notifications = true

	config.Consumer.Offsets.Initial = sarama.OffsetOldest //初始从最新的offset开始

	c, err := cluster.NewConsumer(strings.Split("mq.cps.cvod.net:9092", ","), groupID, strings.Split(topicList, ","), config)
	if err != nil {
		fmt.Println(err)
		return
	}
	defer c.Close()
	go func() {
		for err := range c.Errors() {
			fmt.Println(err.Error())
		}
	}()

	go func() {
		for note := range c.Notifications() {
			fmt.Printf("Rebalanced: %+v\n", note)
		}
	}()

	for msg := range c.Messages() {
		fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value)
		c.MarkOffset(msg, "") //MarkOffset 并不是实时写入kafka，有可能在程序crash时丢掉未提交的offset
	}
}